In [14]:
# Initialization Spark in Python
from pyspark import SparkContext
from pyspark import SparkFiles
sc = SparkContext("local", "Additional opportunities Spark")
In [50]:
file = sc.textFile("log.txt")
In [51]:
blankLines = sc.accumulator(0)
In [52]:
def extractCallSigns(line):
global blankLines
if (line == ""):
blankLines += 1
return line.split(" ")
In [53]:
callSigns = file.flatMap(extractCallSigns)
In [54]:
callSigns.count()
print "Blank lines: %d" % blankLines.value
In [ ]:
singPrefixes = sc.broadcast(<data>)
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (
contactCounts.map(processSignCount).reduceByKey(lambda x, y: x + y))
In [2]:
contactsContactList = sc.parallelize([23,34,33,23])
In [3]:
distScript = "finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript)
In [4]:
def hasDistInfo(call):
requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
return all(map(lambda f: call[f], requiredFields))
In [5]:
def formatCall(call):
return "{0},{1},{2},{3}".format(
call["mylat"], call["mylong"], call["contactlat"], call["contactlong"])
In [6]:
pipeInputs = contactsContactList.values().flatMap(
lambda calls: map(formatCall, filter(hasDistInfo, calls)))
In [7]:
distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
In [ ]:
distances.collect()
In [15]:
rdd = sc.parallelize([23,34,33,23])
In [16]:
rdd.count()
Out[16]:
In [17]:
rdd.mean()
Out[17]:
In [18]:
rdd.sum()
Out[18]:
In [19]:
rdd.max()
Out[19]:
In [20]:
rdd.min()
Out[20]:
In [21]:
rdd.variance()
Out[21]:
In [22]:
rdd.sampleVariance()
Out[22]:
In [23]:
rdd.stdev()
Out[23]:
In [24]:
rdd.sampleStdev()
Out[24]:
In [ ]: